{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved. \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-partition-per-folder.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Using Azure Machine Learning Pipelines for Batch Inference for files input partitioned by folder structure\n",
        "\n",
        "In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n",
        "\n",
        "> **Tip**\n",
        "If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/v1/how-to-consume-web-service) instead of batch prediction.\n",
        "\n",
        "This example will create a sample dataset with nested folder structure, where the folder name corresponds to the attribute of the files inside it. The Batch Inference job would split the files inside the dataset according to their attributes, so that all files with identical value on the specified attribute will form up a single mini-batch to be processed.\n",
        "\n",
        "The outline of this notebook is as follows:\n",
        "\n",
        "- Create a dataset with nested folder structure and `partition_format` to interpret the folder structure into the attributes of files inside.\n",
        "- Do batch inference on each mini-batch defined by the folder structure.\n",
        "\n",
        "## Prerequisites\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first. This sets you up with a working config file that has information on your workspace, subscription id, etc. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Connect to workspace"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.workspace import Workspace\n",
        "ws = Workspace.from_config()\n",
        "print('Workspace name: ' + ws.name, \n",
        "      'Azure region: ' + ws.location, \n",
        "      'Subscription id: ' + ws.subscription_id, \n",
        "      'Resource group: ' + ws.resource_group, sep = '\\n')\n",
        "\n",
        "datastore = ws.get_default_datastore()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import azureml.core\n",
        "print(azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Upload local test data to datastore\n",
        "The destination folder in the datastore is structured so that the name of each folder layer corresponds to a property of all the files inside the foler."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Dataset\n",
        "\n",
        "datastore.upload('test_files/disco', 'dataset_partition_test/user1/winter', overwrite=True, show_progress=False)\n",
        "datastore.upload('test_files/orchestra', 'dataset_partition_test/user1/fall', overwrite=True, show_progress=False)\n",
        "datastore.upload('test_files/piano', 'dataset_partition_test/user2/summer', overwrite=True, show_progress=False)\n",
        "datastore.upload('test_files/spirituality', 'dataset_partition_test/user3/fall', overwrite=True, show_progress=False)\n",
        "datastore.upload('test_files/piano', 'dataset_partition_test/user4/spring', overwrite=True, show_progress=False)\n",
        "datastore.upload('test_files/piano', 'dataset_partition_test/user4/fall', overwrite=True, show_progress=False)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Create partitioned file dataset\n",
        "Create a file dataset partitioned by 'user', 'season', and 'genres', each corresponds to a folder layer specified in `partition_format`. You can get a partition of data by specifying the value of one or more partition keys. E.g., by specifying `user=user1 and genres=piano`, you can get all the file that matches `dataset_partition_test/user1/*/piano.wav`."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "partitioned_file_dataset = Dataset.File.from_files(path=(datastore, 'dataset_partition_test/*/*/*.wav'),\n",
        "                                                   partition_format=\"dataset_partition_test/{user}/{season}/{genres}.wav\",\n",
        "                                                   validate=False)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "partitioned_file_dataset.partition_keys"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Create or Attach existing compute resource"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import os\n",
        "from azureml.core.compute import AmlCompute, ComputeTarget\n",
        "\n",
        "# choose a name for your cluster\n",
        "compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n",
        "compute_min_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MIN_NODES\", 0)\n",
        "compute_max_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MAX_NODES\", 2)\n",
        "\n",
        "# This example uses CPU VM. For using GPU VM, set SKU to Standard_NC6s_v3\n",
        "vm_size = os.environ.get(\"AML_COMPUTE_CLUSTER_SKU\", \"STANDARD_D2_V2\")\n",
        "\n",
        "\n",
        "if compute_name in ws.compute_targets:\n",
        "    compute_target = ws.compute_targets[compute_name]\n",
        "    if compute_target and type(compute_target) is AmlCompute:\n",
        "        print('found compute target. just use it. ' + compute_name)\n",
        "else:\n",
        "    print('creating a new compute target...')\n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,\n",
        "                                                                min_nodes = compute_min_nodes, \n",
        "                                                                max_nodes = compute_max_nodes)\n",
        "\n",
        "    # create the cluster\n",
        "    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n",
        "    \n",
        "    # can poll for a minimum number of nodes and for a specific timeout. \n",
        "    # if no min node count is provided it will use the scale settings for the cluster\n",
        "    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
        "    \n",
        "     # For a more detailed view of current AmlCompute status, use get_status()\n",
        "    print(compute_target.get_status().serialize())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Intermediate/Output Data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import Pipeline, PipelineData\n",
        "\n",
        "output_dir = PipelineData(name=\"file_dataset_inferences\", datastore=datastore)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Calculate total file size of each mini-batch partitioned by dataset partition key(s)\n",
        "The script is to sum up the total size of files in each mini-batch."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "scripts_folder = \"Code\"\n",
        "script_file = \"total_file_size.py\"\n",
        "\n",
        "# peek at contents\n",
        "with open(os.path.join(scripts_folder, script_file)) as inference_file:\n",
        "    print(inference_file.read())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Build and run the batch inference pipeline\n",
        "### Specify the environment to run the script\n",
        "You would need to specify the required private azureml packages in dependencies. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Environment\n",
        "from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE\n",
        "\n",
        "batch_conda_deps = CondaDependencies.create(pip_packages=[\"azureml-core\", \"azureml-dataset-runtime[fuse]\"])\n",
        "batch_env = Environment(name=\"batch_environment\")\n",
        "batch_env.python.conda_dependencies = batch_conda_deps\n",
        "batch_env.docker.base_image = DEFAULT_CPU_IMAGE"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Create the configuration to wrap the inference script\n",
        "The parameter `partition_keys` is a list containing a subset of the dataset partition keys, specifying how is the input dataset partitioned. Each and every possible combination of values of partition_keys will form up a mini-batch. E.g., by specifying `partition_keys=['user', 'genres']` will result in 5 mini-batches, i.e. `user=halit && genres=disco`, `user=halit && genres=orchestra`, `user=chunyu && genres=piano`, `user=kin && genres=spirituality` and `user=ramandeep && genres=piano`"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig\n",
        "\n",
        "# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.\n",
        "parallel_run_config = ParallelRunConfig(\n",
        "    source_directory=scripts_folder,\n",
        "    entry_script=script_file,  # the user script to run against each input\n",
        "    partition_keys=['user', 'genres'],\n",
        "    error_threshold=5,\n",
        "    output_action='append_row',\n",
        "    append_row_file_name=\"file_size_outputs.txt\",\n",
        "    environment=batch_env,\n",
        "    compute_target=compute_target, \n",
        "    node_count=2,\n",
        "    run_invocation_timeout=600\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Create the pipeline step"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "parallel_run_step = ParallelRunStep(\n",
        "    name='summarize-file-size',\n",
        "    inputs=[partitioned_file_dataset.as_named_input(\"partitioned_file_input\")],\n",
        "    output=output_dir,\n",
        "    parallel_run_config=parallel_run_config,\n",
        "    allow_reuse=False\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Run the pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Experiment\n",
        "from azureml.pipeline.core import Pipeline\n",
        "\n",
        "pipeline = Pipeline(workspace=ws, steps=[parallel_run_step])\n",
        "\n",
        "pipeline_run = Experiment(ws, 'file-dataset-partition').submit(pipeline)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run.wait_for_completion(show_output=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## View the prediction results\n",
        "In the total_file_size.py file above you can see that the ResultList with the filename and the prediction result gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called inferences. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import pandas as pd\n",
        "import tempfile\n",
        "\n",
        "batch_run = pipeline_run.find_step_run(parallel_run_step.name)[0]\n",
        "batch_output = batch_run.get_output_data(output_dir.name)\n",
        "\n",
        "target_dir = tempfile.mkdtemp()\n",
        "batch_output.download(local_path=target_dir)\n",
        "result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name)\n",
        "\n",
        "df = pd.read_csv(result_file, delimiter=\",\", header=None)\n",
        "df.columns = [\"File Name\", \"File Size\", \"Ratio of Size in Partition\", \"user\", \"genres\", \"Total File Size of Partition\"]\n",
        "print(\"Prediction has\", df.shape[0], \"rows\")\n",
        "df.head(10)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": []
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "prsbjdev"
      }
    ],
    "category": "Other notebooks",
    "compute": [
      "AML Compute"
    ],
    "datasets": [
      "None"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "None"
    ],
    "friendly_name": "Batch inferencing file data partitioned by folder using ParallelRunStep",
    "index_order": 1,
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.8.16"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 4
}